/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.channel;

import io.netty.buffer.ByteBufAllocator;
import io.netty.util.*;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.OrderedEventExecutor;
import io.netty.util.internal.*;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import static io.netty.channel.ChannelHandlerMask.*;

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
  
  private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
  volatile AbstractChannelHandlerContext next;
  volatile AbstractChannelHandlerContext prev;
  
  private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
      AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
  
  /**
   * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
   */
  private static final int ADD_PENDING = 1;
  /**
   * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
   */
  private static final int ADD_COMPLETE = 2;
  /**
   * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
   */
  private static final int REMOVE_COMPLETE = 3;
  /**
   * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
   * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
   */
  private static final int INIT = 0;
  
  private final DefaultChannelPipeline pipeline;
  private final String name;
  private final boolean ordered;
  private final int executionMask;
  
  // Will be set to null if no child executor should be used, otherwise it will be set to the
  // child executor.
  final EventExecutor executor;
  private ChannelFuture succeededFuture;
  
  // Lazily instantiated tasks used to trigger events to a handler with different executor.
  // There is no need to make this volatile as at worse it will just create a few more instances then needed.
  private Tasks invokeTasks;
  
  private volatile int handlerState = INIT;
  
  AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
                                String name, Class<? extends ChannelHandler> handlerClass) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    this.executionMask = mask(handlerClass);
    // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
    ordered = executor == null || executor instanceof OrderedEventExecutor;
  }
  
  @Override
  public Channel channel() {
    return pipeline.channel();
  }
  
  @Override
  public ChannelPipeline pipeline() {
    return pipeline;
  }
  
  @Override
  public ByteBufAllocator alloc() {
    return channel().config().getAllocator();
  }
  
  @Override
  public EventExecutor executor() {
    if (executor == null) {
      return channel().eventLoop();
    } else {
      return executor;
    }
  }
  
  @Override
  public String name() {
    return name;
  }
  
  // 以 注册 channel 为例
  @Override
  public ChannelHandlerContext fireChannelRegistered() {
    // MASK_CHANNEL_REGISTERED 这个 registered 的 mask掩码
    invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
    return this;
  }
  
  static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
      next.invokeChannelRegistered();
    } else {
      executor.execute(new Runnable() {
        @Override
        public void run() {
          next.invokeChannelRegistered();
        }
      });
    }
  }
  
  private void invokeChannelRegistered() {
    if (invokeHandler()) {
      try {
        ((ChannelInboundHandler) handler()).channelRegistered(this);
      } catch (Throwable t) {
        notifyHandlerException(t);
      }
    } else {
      fireChannelRegistered();
    }
  }
  
  @Override
  public ChannelHandlerContext fireChannelUnregistered() {
    invokeChannelUnregistered(findContextInbound(MASK_CHANNEL_UNREGISTERED));
    return this;
  }
  
  static void invokeChannelUnregistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
      next.invokeChannelUnregistered();
    } else {
      executor.execute(new Runnable() {
        @Override
        public void run() {
          next.invokeChannelUnregistered();
        }
      });
    }
  }
  
  private void invokeChannelUnregistered() {
    if (invokeHandler()) {
      try {
        ((ChannelInboundHandler) handler()).channelUnregistered(this);
      } catch (Throwable t) {
        notifyHandlerException(t);
      }
    } else {
      fireChannelUnregistered();
    }
  }
  
  @Override
  public ChannelHandlerContext fireChannelActive() {
    invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE));
    return this;
  }
  
  static void invokeChannelActive(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
      next.invokeChannelActive();
    } else {
      executor.execute(new Runnable() {
        @Override
        public void run() {
          next.invokeChannelActive();
        }
      });
    }
  }
  
  private void invokeChannelActive() {
    if (invokeHandler()) {
      try {
        ((ChannelInboundHandler) handler()).channelActive(this);
      } catch (Throwable t) {
        notifyHandlerException(t);
      }
    } else {
      fireChannelActive();
    }
  }
  
  @Override
  public ChannelHandlerContext fireChannelInactive() {
    invokeChannelInactive(findContextInbound(MASK_CHANNEL_INACTIVE));
    return this;
  }
  
  static void invokeChannelInactive(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
      next.invokeChannelInactive();
    } else {
      executor.execute(new Runnable() {
        @Override
        public void run() {
          next.invokeChannelInactive();
        }
      });
    }
  }
  
  private void invokeChannelInactive() {
    if (invokeHandler()) {
      try {
        ((ChannelInboundHandler) handler()).channelInactive(this);
      } catch (Throwable t) {
        notifyHandlerException(t);
      }
    } else {
      fireChannelInactive();
    }
  }
  
  @Override
  public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
    invokeExceptionCaught(findContextInbound(MASK_EXCEPTION_CAUGHT), cause);
    return this;
  }
  
  static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
    ObjectUtil.checkNotNull(cause, "cause");
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
      next.invokeExceptionCaught(cause);
    } else {
      try {
        executor.execute(new Runnable() {
          @Override
          public void run() {
            next.invokeExceptionCaught(cause);
          }
        });
      } catch (Throwable t) {
        if (logger.isWarnEnabled()) {
          logger.warn("Failed to submit an exceptionCaught() event.", t);
          logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
        }
      }
    }
  }
  
  private void invokeExceptionCaught(final Throwable cause) {
    if (invokeHandler()) {
      try {
        handler().exceptionCaught(this, cause);
      } catch (Throwable error) {
        if (logger.isDebugEnabled()) {
          logger.debug(
              "An exception {}" +
                  "was thrown by a user handler's exceptionCaught() " +
                  "method while handling the following exception:",
              ThrowableUtil.stackTraceToString(error), cause);
        } else if (logger.isWarnEnabled()) {
          logger.warn(
              "An exception '{}' [enable DEBUG level for full stacktrace] " +
                  "was thrown by a user handler's exceptionCaught() " +
                  "method while handling the following exception:", error, cause);
        }
      }
    } else {
      fireExceptionCaught(cause);
    }
  }
  
  @Override
  public ChannelHandlerContext fireUserEventTriggered(final Object event) {
    invokeUserEventTriggered(findContextInbound(MASK_USER_EVENT_TRIGGERED), event);
    return this;
  }
  
  static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) {
    ObjectUtil.checkNotNull(event, "event");
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
      next.invokeUserEventTriggered(event);
    } else {
      executor.execute(new Runnable() {
        @Override
        public void run() {
          next.invokeUserEventTriggered(event);
        }
      });
    }
  }
  
  private void invokeUserEventTriggered(Object event) {
    if (invokeHandler()) {
      try {
        ((ChannelInboundHandler) handler()).userEventTriggered(this, event);
      } catch (Throwable t) {
        notifyHandlerException(t);
      }
    } else {
      fireUserEventTriggered(event);
    }
  }
  
  // 表示 发射 channel read 操作
  @Override
  public ChannelHandlerContext fireChannelRead(final Object msg) {
    // 执行 channel read
    // findContextInbound(MASK_CHANNEL_READ) 拿到上下文的 next 值
    invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
    return this;
  }
  
  // 执行 channel read
  static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    // 判断是否在子线程的范围内
    if (executor.inEventLoop()) {
      next.invokeChannelRead(m);
    } else {
      // 启动一个子线程 来执行next
      executor.execute(new Runnable() {
        @Override
        public void run() {
          next.invokeChannelRead(m);
        }
      });
    }
  }
  
  private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
      try {
        ((ChannelInboundHandler) handler()).channelRead(this, msg);
      } catch (Throwable t) {
        notifyHandlerException(t);
      }
    } else {
      fireChannelRead(msg);
    }
  }
  
  @Override
  public ChannelHandlerContext fireChannelReadComplete() {
    invokeChannelReadComplete(findContextInbound(MASK_CHANNEL_READ_COMPLETE));
    return this;
  }
  
  static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
      next.invokeChannelReadComplete();
    } else {
      Tasks tasks = next.invokeTasks;
      if (tasks == null) {
        next.invokeTasks = tasks = new Tasks(next);
      }
      executor.execute(tasks.invokeChannelReadCompleteTask);
    }
  }
  
  private void invokeChannelReadComplete() {
    if (invokeHandler()) {
      try {
        ((ChannelInboundHandler) handler()).channelReadComplete(this);
      } catch (Throwable t) {
        notifyHandlerException(t);
      }
    } else {
      fireChannelReadComplete();
    }
  }
  
  @Override
  public ChannelHandlerContext fireChannelWritabilityChanged() {
    invokeChannelWritabilityChanged(findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED));
    return this;
  }
  
  static void invokeChannelWritabilityChanged(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
      next.invokeChannelWritabilityChanged();
    } else {
      Tasks tasks = next.invokeTasks;
      if (tasks == null) {
        next.invokeTasks = tasks = new Tasks(next);
      }
      executor.execute(tasks.invokeChannelWritableStateChangedTask);
    }
  }
  
  private void invokeChannelWritabilityChanged() {
    if (invokeHandler()) {
      try {
        ((ChannelInboundHandler) handler()).channelWritabilityChanged(this);
      } catch (Throwable t) {
        notifyHandlerException(t);
      }
    } else {
      fireChannelWritabilityChanged();
    }
  }
  
  @Override
  public ChannelFuture bind(SocketAddress localAddress) {
    return bind(localAddress, newPromise());
  }
  
  @Override
  public ChannelFuture connect(SocketAddress remoteAddress) {
    return connect(remoteAddress, newPromise());
  }
  
  @Override
  public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
    return connect(remoteAddress, localAddress, newPromise());
  }
  
  @Override
  public ChannelFuture disconnect() {
    return disconnect(newPromise());
  }
  
  @Override
  public ChannelFuture close() {
    return close(newPromise());
  }
  
  @Override
  public ChannelFuture deregister() {
    return deregister(newPromise());
  }
  
  @Override
  public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    if (localAddress == null) {
      throw new NullPointerException("localAddress");
    }
    if (isNotValidPromise(promise, false)) {
      // cancelled
      return promise;
    }
    
    final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
      next.invokeBind(localAddress, promise);
    } else {
      safeExecute(executor, new Runnable() {
        @Override
        public void run() {
          next.invokeBind(localAddress, promise);
        }
      }, promise, null);
    }
    return promise;
  }
  
  private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {
      try {
        ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
      } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
      }
    } else {
      bind(localAddress, promise);
    }
  }
  
  @Override
  public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return connect(remoteAddress, null, promise);
  }
  
  @Override
  public ChannelFuture connect(
      final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    
    if (remoteAddress == null) {
      throw new NullPointerException("remoteAddress");
    }
    if (isNotValidPromise(promise, false)) {
      // cancelled
      return promise;
    }
    
    final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
      next.invokeConnect(remoteAddress, localAddress, promise);
    } else {
      safeExecute(executor, new Runnable() {
        @Override
        public void run() {
          next.invokeConnect(remoteAddress, localAddress, promise);
        }
      }, promise, null);
    }
    return promise;
  }
  
  private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {
      try {
        ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
      } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
      }
    } else {
      connect(remoteAddress, localAddress, promise);
    }
  }
  
  @Override
  public ChannelFuture disconnect(final ChannelPromise promise) {
    if (!channel().metadata().hasDisconnect()) {
      // Translate disconnect to close if the channel has no notion of disconnect-reconnect.
      // So far, UDP/IP is the only transport that has such behavior.
      return close(promise);
    }
    if (isNotValidPromise(promise, false)) {
      // cancelled
      return promise;
    }
    
    final AbstractChannelHandlerContext next = findContextOutbound(MASK_DISCONNECT);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
      next.invokeDisconnect(promise);
    } else {
      safeExecute(executor, new Runnable() {
        @Override
        public void run() {
          next.invokeDisconnect(promise);
        }
      }, promise, null);
    }
    return promise;
  }
  
  private void invokeDisconnect(ChannelPromise promise) {
    if (invokeHandler()) {
      try {
        ((ChannelOutboundHandler) handler()).disconnect(this, promise);
      } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
      }
    } else {
      disconnect(promise);
    }
  }
  
  @Override
  public ChannelFuture close(final ChannelPromise promise) {
    if (isNotValidPromise(promise, false)) {
      // cancelled
      return promise;
    }
    
    final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
      next.invokeClose(promise);
    } else {
      safeExecute(executor, new Runnable() {
        @Override
        public void run() {
          next.invokeClose(promise);
        }
      }, promise, null);
    }
    
    return promise;
  }
  
  private void invokeClose(ChannelPromise promise) {
    if (invokeHandler()) {
      try {
        ((ChannelOutboundHandler) handler()).close(this, promise);
      } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
      }
    } else {
      close(promise);
    }
  }
  
  @Override
  public ChannelFuture deregister(final ChannelPromise promise) {
    if (isNotValidPromise(promise, false)) {
      // cancelled
      return promise;
    }
    
    final AbstractChannelHandlerContext next = findContextOutbound(MASK_DEREGISTER);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
      next.invokeDeregister(promise);
    } else {
      safeExecute(executor, new Runnable() {
        @Override
        public void run() {
          next.invokeDeregister(promise);
        }
      }, promise, null);
    }
    
    return promise;
  }
  
  private void invokeDeregister(ChannelPromise promise) {
    if (invokeHandler()) {
      try {
        ((ChannelOutboundHandler) handler()).deregister(this, promise);
      } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
      }
    } else {
      deregister(promise);
    }
  }
  
  @Override
  public ChannelHandlerContext read() {
    final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
      next.invokeRead();
    } else {
      Tasks tasks = next.invokeTasks;
      if (tasks == null) {
        next.invokeTasks = tasks = new Tasks(next);
      }
      executor.execute(tasks.invokeReadTask);
    }
    
    return this;
  }
  
  private void invokeRead() {
    if (invokeHandler()) {
      try {
        ((ChannelOutboundHandler) handler()).read(this);
      } catch (Throwable t) {
        notifyHandlerException(t);
      }
    } else {
      read();
    }
  }
  
  @Override
  public ChannelFuture write(Object msg) {
    return write(msg, newPromise());
  }
  
  @Override
  public ChannelFuture write(final Object msg, final ChannelPromise promise) {
    write(msg, false, promise);
    
    return promise;
  }
  
  private void invokeWrite(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
      invokeWrite0(msg, promise);
    } else {
      write(msg, promise);
    }
  }
  
  private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
      ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
      notifyOutboundHandlerException(t, promise);
    }
  }
  
  @Override
  public ChannelHandlerContext flush() {
    final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
      next.invokeFlush();
    } else {
      Tasks tasks = next.invokeTasks;
      if (tasks == null) {
        next.invokeTasks = tasks = new Tasks(next);
      }
      safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null);
    }
    
    return this;
  }
  
  private void invokeFlush() {
    if (invokeHandler()) {
      invokeFlush0();
    } else {
      flush();
    }
  }
  
  private void invokeFlush0() {
    try {
      ((ChannelOutboundHandler) handler()).flush(this);
    } catch (Throwable t) {
      notifyHandlerException(t);
    }
  }
  
  @Override
  public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    write(msg, true, promise);
    return promise;
  }
  
  private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
      invokeWrite0(msg, promise);
      invokeFlush0();
    } else {
      writeAndFlush(msg, promise);
    }
  }
  
  private void write(Object msg, boolean flush, ChannelPromise promise) {
    ObjectUtil.checkNotNull(msg, "msg");
    try {
      if (isNotValidPromise(promise, true)) {
        ReferenceCountUtil.release(msg);
        // cancelled
        return;
      }
    } catch (RuntimeException e) {
      ReferenceCountUtil.release(msg);
      throw e;
    }
    
    final AbstractChannelHandlerContext next = findContextOutbound(flush ?
        (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
    //引用计数用的，用来检测内存泄漏
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
      if (flush) {
        next.invokeWriteAndFlush(m, promise);
      } else {
        next.invokeWrite(m, promise);
      }
    } else {
      final AbstractWriteTask task;
      if (flush) {
        task = WriteAndFlushTask.newInstance(next, m, promise);
      } else {
        task = WriteTask.newInstance(next, m, promise);
      }
      if (!safeExecute(executor, task, promise, m)) {
        // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
        // and put it back in the Recycler for re-use later.
        //
        // See https://github.com/netty/netty/issues/8343.
        task.cancel();
      }
    }
  }
  
  @Override
  public ChannelFuture writeAndFlush(Object msg) {
    return writeAndFlush(msg, newPromise());
  }
  
  private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
    // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
    // false.
    PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
  }
  
  private void notifyHandlerException(Throwable cause) {
    if (inExceptionCaught(cause)) {
      if (logger.isWarnEnabled()) {
        logger.warn(
            "An exception was thrown by a user handler " +
                "while handling an exceptionCaught event", cause);
      }
      return;
    }
    
    invokeExceptionCaught(cause);
  }
  
  private static boolean inExceptionCaught(Throwable cause) {
    do {
      StackTraceElement[] trace = cause.getStackTrace();
      if (trace != null) {
        for (StackTraceElement t : trace) {
          if (t == null) {
            break;
          }
          if ("exceptionCaught".equals(t.getMethodName())) {
            return true;
          }
        }
      }
      
      cause = cause.getCause();
    } while (cause != null);
    
    return false;
  }
  
  @Override
  public ChannelPromise newPromise() {
    return new DefaultChannelPromise(channel(), executor());
  }
  
  @Override
  public ChannelProgressivePromise newProgressivePromise() {
    return new DefaultChannelProgressivePromise(channel(), executor());
  }
  
  @Override
  public ChannelFuture newSucceededFuture() {
    ChannelFuture succeededFuture = this.succeededFuture;
    if (succeededFuture == null) {
      this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
    }
    return succeededFuture;
  }
  
  @Override
  public ChannelFuture newFailedFuture(Throwable cause) {
    return new FailedChannelFuture(channel(), executor(), cause);
  }
  
  private boolean isNotValidPromise(ChannelPromise promise, boolean allowVoidPromise) {
    if (promise == null) {
      throw new NullPointerException("promise");
    }
    
    if (promise.isDone()) {
      // Check if the promise was cancelled and if so signal that the processing of the operation
      // should not be performed.
      //
      // See https://github.com/netty/netty/issues/2349
      if (promise.isCancelled()) {
        return true;
      }
      throw new IllegalArgumentException("promise already done: " + promise);
    }
    
    if (promise.channel() != channel()) {
      throw new IllegalArgumentException(String.format(
          "promise.channel does not match: %s (expected: %s)", promise.channel(), channel()));
    }
    
    if (promise.getClass() == DefaultChannelPromise.class) {
      return false;
    }
    
    if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
      throw new IllegalArgumentException(
          StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
    }
    
    if (promise instanceof AbstractChannel.CloseFuture) {
      throw new IllegalArgumentException(
          StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
    }
    return false;
  }
  
  private AbstractChannelHandlerContext findContextInbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    do {
      // 从 这里 拿到 next值
      ctx = ctx.next;
    } while ((ctx.executionMask & mask) == 0);
    //executionMask是把handler进行添加的时候算出来的值（查看ChannelHandlerMask类中常量），然后判断是否存在
    return ctx;
  }
  
  private AbstractChannelHandlerContext findContextOutbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    do {
      ctx = ctx.prev;
    } while ((ctx.executionMask & mask) == 0);
    return ctx;
  }
  
  @Override
  public ChannelPromise voidPromise() {
    return channel().voidPromise();
  }
  
  final void setRemoved() {
    handlerState = REMOVE_COMPLETE;
  }
  
  final boolean setAddComplete() {
    for (; ; ) {
      int oldState = handlerState;
      if (oldState == REMOVE_COMPLETE) {
        return false;
      }
      // Ensure we never update when the handlerState is REMOVE_COMPLETE already.
      // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
      // exposing ordering guarantees.
      if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
        return true;
      }
    }
  }
  
  final void setAddPending() {
    boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
    assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved().
  }
  
  final void callHandlerAdded() throws Exception {
    // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
    // any pipeline events ctx.handler() will miss them because the state will not allow it.
    if (setAddComplete()) {
      handler().handlerAdded(this);
    }
  }
  
  final void callHandlerRemoved() throws Exception {
    try {
      // Only call handlerRemoved(...) if we called handlerAdded(...) before.
      if (handlerState == ADD_COMPLETE) {
        handler().handlerRemoved(this);
      }
    } finally {
      // Mark the handler as removed in any case.
      setRemoved();
    }
  }
  
  /**
   * Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
   * yet. If not return {@code false} and if called or could not detect return {@code true}.
   * <p>
   * If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event.
   * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list
   * but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}.
   */
  private boolean invokeHandler() {
    // Store in local variable to reduce volatile reads.
    int handlerState = this.handlerState;
    return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
  }
  
  @Override
  public boolean isRemoved() {
    return handlerState == REMOVE_COMPLETE;
  }
  
  @Override
  public <T> Attribute<T> attr(AttributeKey<T> key) {
    return channel().attr(key);
  }
  
  @Override
  public <T> boolean hasAttr(AttributeKey<T> key) {
    return channel().hasAttr(key);
  }
  
  private static boolean safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
    try {
      executor.execute(runnable);
      return true;
    } catch (Throwable cause) {
      try {
        promise.setFailure(cause);
      } finally {
        if (msg != null) {
          ReferenceCountUtil.release(msg);
        }
      }
      return false;
    }
  }
  
  @Override
  public String toHintString() {
    return '\'' + name + "' will handle the message from this point.";
  }
  
  @Override
  public String toString() {
    return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')';
  }
  
  abstract static class AbstractWriteTask implements Runnable {
    
    private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
        SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
    
    // Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment
    private static final int WRITE_TASK_OVERHEAD =
        SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48);
    
    private final Recycler.Handle<AbstractWriteTask> handle;
    private AbstractChannelHandlerContext ctx;
    private Object msg;
    private ChannelPromise promise;
    private int size;
    
    @SuppressWarnings("unchecked")
    private AbstractWriteTask(Recycler.Handle<? extends AbstractWriteTask> handle) {
      this.handle = (Recycler.Handle<AbstractWriteTask>) handle;
    }
    
    protected static void init(AbstractWriteTask task, AbstractChannelHandlerContext ctx,
                               Object msg, ChannelPromise promise) {
      task.ctx = ctx;
      task.msg = msg;
      task.promise = promise;
      
      if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
        task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
        ctx.pipeline.incrementPendingOutboundBytes(task.size);
      } else {
        task.size = 0;
      }
    }
    
    @Override
    public final void run() {
      try {
        decrementPendingOutboundBytes();
        write(ctx, msg, promise);
      } finally {
        recycle();
      }
    }
    
    void cancel() {
      try {
        decrementPendingOutboundBytes();
      } finally {
        recycle();
      }
    }
    
    private void decrementPendingOutboundBytes() {
      if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
        ctx.pipeline.decrementPendingOutboundBytes(size);
      }
    }
    
    private void recycle() {
      // Set to null so the GC can collect them directly
      ctx = null;
      msg = null;
      promise = null;
      handle.recycle(this);
    }
    
    protected void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
      ctx.invokeWrite(msg, promise);
    }
  }
  
  static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable {
    
    private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
      @Override
      protected WriteTask newObject(Handle<WriteTask> handle) {
        return new WriteTask(handle);
      }
    };
    
    static WriteTask newInstance(
        AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
      WriteTask task = RECYCLER.get();
      init(task, ctx, msg, promise);
      return task;
    }
    
    private WriteTask(Recycler.Handle<WriteTask> handle) {
      super(handle);
    }
  }
  
  static final class WriteAndFlushTask extends AbstractWriteTask {
    
    private static final Recycler<WriteAndFlushTask> RECYCLER = new Recycler<WriteAndFlushTask>() {
      @Override
      protected WriteAndFlushTask newObject(Handle<WriteAndFlushTask> handle) {
        return new WriteAndFlushTask(handle);
      }
    };
    
    static WriteAndFlushTask newInstance(
        AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
      WriteAndFlushTask task = RECYCLER.get();
      init(task, ctx, msg, promise);
      return task;
    }
    
    private WriteAndFlushTask(Recycler.Handle<WriteAndFlushTask> handle) {
      super(handle);
    }
    
    @Override
    public void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
      super.write(ctx, msg, promise);
      ctx.invokeFlush();
    }
  }
  
  private static final class Tasks {
    private final AbstractChannelHandlerContext next;
    private final Runnable invokeChannelReadCompleteTask = new Runnable() {
      @Override
      public void run() {
        next.invokeChannelReadComplete();
      }
    };
    private final Runnable invokeReadTask = new Runnable() {
      @Override
      public void run() {
        next.invokeRead();
      }
    };
    private final Runnable invokeChannelWritableStateChangedTask = new Runnable() {
      @Override
      public void run() {
        next.invokeChannelWritabilityChanged();
      }
    };
    private final Runnable invokeFlushTask = new Runnable() {
      @Override
      public void run() {
        next.invokeFlush();
      }
    };
    
    Tasks(AbstractChannelHandlerContext next) {
      this.next = next;
    }
  }
}
